aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-05-26 00:28:47 +0800
committerCheng Lian <lian@databricks.com>2015-05-26 00:28:47 +0800
commit8af1bf10b70b9b67f18f618174e84365d69caa48 (patch)
treea9cec94f9c6c8e9ed91d8f90fcedb5bd42b73303 /sql/hive
parentbfeedc69a29a1dfbfc520545e3fc95389ea1b82d (diff)
downloadspark-8af1bf10b70b9b67f18f618174e84365d69caa48.tar.gz
spark-8af1bf10b70b9b67f18f618174e84365d69caa48.tar.bz2
spark-8af1bf10b70b9b67f18f618174e84365d69caa48.zip
[SPARK-7842] [SQL] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust
When committing/aborting a write task issued in `InsertIntoHadoopFsRelation`, if an exception is thrown from `OutputWriter.close()`, the committing/aborting process will be interrupted, and leaves messy stuff behind (e.g., the `_temporary` directory created by `FileOutputCommitter`). This PR makes these two process more robust by catching potential exceptions and falling back to normal task committment/abort. Author: Cheng Lian <lian@databricks.com> Closes #6378 from liancheng/spark-7838 and squashes the following commits: f18253a [Cheng Lian] Makes task committing/aborting in InsertIntoHadoopFsRelation more robust
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala42
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala22
2 files changed, 62 insertions, 2 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 2d69b89fd9..de907846b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Row, SQLContext}
/**
@@ -67,7 +67,9 @@ class SimpleTextOutputWriter(path: String, context: TaskAttemptContext) extends
recordWriter.write(null, new Text(serialized))
}
- override def close(): Unit = recordWriter.close(context)
+ override def close(): Unit = {
+ recordWriter.close(context)
+ }
}
/**
@@ -120,3 +122,39 @@ class SimpleTextRelation(
}
}
}
+
+/**
+ * A simple example [[HadoopFsRelationProvider]].
+ */
+class CommitFailureTestSource extends HadoopFsRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ paths: Array[String],
+ schema: Option[StructType],
+ partitionColumns: Option[StructType],
+ parameters: Map[String, String]): HadoopFsRelation = {
+ new CommitFailureTestRelation(paths, schema, partitionColumns, parameters)(sqlContext)
+ }
+}
+
+class CommitFailureTestRelation(
+ override val paths: Array[String],
+ maybeDataSchema: Option[StructType],
+ override val userDefinedPartitionColumns: Option[StructType],
+ parameters: Map[String, String])(
+ @transient sqlContext: SQLContext)
+ extends SimpleTextRelation(
+ paths, maybeDataSchema, userDefinedPartitionColumns, parameters)(sqlContext) {
+ override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new SimpleTextOutputWriter(path, context) {
+ override def close(): Unit = {
+ sys.error("Intentional task commitment failure for testing purpose.")
+ }
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 32226905bc..70328e1ef8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.sources
import org.apache.hadoop.fs.Path
+import org.scalatest.FunSuite
+import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHive
@@ -477,6 +479,26 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
+class CommitFailureTestRelationSuite extends FunSuite with SQLTestUtils {
+ import TestHive.implicits._
+
+ override val sqlContext = TestHive
+
+ val dataSourceName: String = classOf[CommitFailureTestSource].getCanonicalName
+
+ test("SPARK-7684: commitTask() failure should fallback to abortTask()") {
+ withTempPath { file =>
+ val df = (1 to 3).map(i => i -> s"val_$i").toDF("a", "b")
+ intercept[SparkException] {
+ df.write.format(dataSourceName).save(file.getCanonicalPath)
+ }
+
+ val fs = new Path(file.getCanonicalPath).getFileSystem(SparkHadoopUtil.get.conf)
+ assert(!fs.exists(new Path(file.getCanonicalPath, "_temporary")))
+ }
+ }
+}
+
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName