aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala1
2 files changed, 15 insertions, 3 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 58f7fa640e..0c700bdb37 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
@@ -62,7 +62,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
- def saveAsHiveFile(
+ private def saveAsHiveFile(
rdd: RDD[InternalRow],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
@@ -178,6 +178,19 @@ case class InsertIntoHiveTable(
val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use a output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
val writerContainer = if (numDynamicPartitions > 0) {
val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 29a6f08f40..4ca8042d22 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapred._
import org.apache.hadoop.hive.common.FileUtils
import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.Row
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils