aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <cloud0fan@outlook.com>2015-09-14 11:51:39 -0700
committerYin Huai <yhuai@databricks.com>2015-09-14 11:51:39 -0700
commit32407bfd2bdbf84d65cacfa7554dae6a2332bc37 (patch)
tree2b8ecb4d07e4f5290b59039002f58e4be58f58d6 /sql
parentd81565465cc6d4f38b4ed78036cded630c700388 (diff)
downloadspark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.tar.gz
spark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.tar.bz2
spark-32407bfd2bdbf84d65cacfa7554dae6a2332bc37.zip
[SPARK-9899] [SQL] log warning for direct output committer with speculation enabled
This is a follow-up of https://github.com/apache/spark/pull/8317. When speculation is enabled, there may be multiply tasks writing to the same path. Generally it's OK as we will write to a temporary directory first and only one task can commit the temporary directory to target path. However, when we use direct output committer, tasks will write data to target path directly without temporary directory. This causes problems like corrupted data. Please see [PR comment](https://github.com/apache/spark/pull/8191#issuecomment-131598385) for more details. Unfortunately, we don't have a simple flag to tell if a output committer will write to temporary directory or not, so for safety, we have to disable any customized output committer when `speculation` is true. Author: Wenchen Fan <cloud0fan@outlook.com> Closes #8687 from cloud-fan/direct-committer.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala1
2 files changed, 15 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 58f7fa640e..0c700bdb37 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
@@ -62,7 +62,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
- def saveAsHiveFile(
+ private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
@@ -178,6 +178,19 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 29a6f08f40..4ca8042d22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils