aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-18 00:59:05 +0800
committerCheng Lian <lian@databricks.com>2015-08-18 00:59:05 +0800
commit76c155dd4483d58499e5cb66e5e9373bb771dbeb (patch)
tree1c084242d19984c16ff10985190b6fd65fa1ec5a
parentf7efda3975d46a8ce4fd720b3730127ea482560b (diff)
downloadspark-76c155dd4483d58499e5cb66e5e9373bb771dbeb.tar.gz
spark-76c155dd4483d58499e5cb66e5e9373bb771dbeb.tar.bz2
spark-76c155dd4483d58499e5cb66e5e9373bb771dbeb.zip
[SPARK-7837] [SQL] Avoids double closing output writers when commitTask() fails
When inserting data into a `HadoopFsRelation`, if `commitTask()` of the writer container fails, `abortTask()` will be invoked. However, both `commitTask()` and `abortTask()` try to close the output writer(s). The problem is that, closing underlying writers may not be an idempotent operation. E.g., `ParquetRecordWriter.close()` throws NPE when called twice. Author: Cheng Lian <lian@databricks.com> Closes #8236 from liancheng/spark-7837/double-closing.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala21
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala46
2 files changed, 61 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index d36197e50d..e0147079e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -217,6 +217,8 @@ private[sql] class DefaultWriterContainer(
val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
writer.initConverter(dataSchema)
+ var writerClosed = false
+
// If anything below fails, we should abort the task.
try {
while (iterator.hasNext) {
@@ -235,7 +237,10 @@ private[sql] class DefaultWriterContainer(
def commitTask(): Unit = {
try {
assert(writer != null, "OutputWriter instance should have been initialized")
- writer.close()
+ if (!writerClosed) {
+ writer.close()
+ writerClosed = true
+ }
super.commitTask()
} catch {
case cause: Throwable =>
@@ -247,7 +252,10 @@ private[sql] class DefaultWriterContainer(
def abortTask(): Unit = {
try {
- writer.close()
+ if (!writerClosed) {
+ writer.close()
+ writerClosed = true
+ }
} finally {
super.abortTask()
}
@@ -275,6 +283,8 @@ private[sql] class DynamicPartitionWriterContainer(
val outputWriters = new java.util.HashMap[InternalRow, OutputWriter]
executorSideSetup(taskContext)
+ var outputWritersCleared = false
+
// Returns the partition key given an input row
val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema)
// Returns the data columns to be written given an input row
@@ -379,8 +389,11 @@ private[sql] class DynamicPartitionWriterContainer(
}
def clearOutputWriters(): Unit = {
- outputWriters.asScala.values.foreach(_.close())
- outputWriters.clear()
+ if (!outputWritersCleared) {
+ outputWriters.asScala.values.foreach(_.close())
+ outputWriters.clear()
+ outputWritersCleared = true
+ }
}
def commitTask(): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d819f3ab5e..e6b0a2ea95 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -424,7 +424,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
configuration.set(
"spark.sql.parquet.output.committer.class",
- classOf[BogusParquetOutputCommitter].getCanonicalName)
+ classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName)
try {
val message = intercept[SparkException] {
@@ -450,12 +450,54 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}.toString
assert(errorMessage.contains("UnknownHostException"))
}
+
+ test("SPARK-7837 Do not close output writer twice when commitTask() fails") {
+ val clonedConf = new Configuration(configuration)
+
+ // Using a output committer that always fail when committing a task, so that both
+ // `commitTask()` and `abortTask()` are invoked.
+ configuration.set(
+ "spark.sql.parquet.output.committer.class",
+ classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName)
+
+ try {
+ // Before fixing SPARK-7837, the following code results in an NPE because both
+ // `commitTask()` and `abortTask()` try to close output writers.
+
+ withTempPath { dir =>
+ val m1 = intercept[SparkException] {
+ sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(m1.contains("Intentional exception for testing purposes"))
+ }
+
+ withTempPath { dir =>
+ val m2 = intercept[SparkException] {
+ val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1)
+ df.write.partitionBy("a").parquet(dir.getCanonicalPath)
+ }.getCause.getMessage
+ assert(m2.contains("Intentional exception for testing purposes"))
+ }
+ } finally {
+ // Hadoop 1 doesn't have `Configuration.unset`
+ configuration.clear()
+ clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
+ }
+ }
}
-class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
override def commitJob(jobContext: JobContext): Unit = {
sys.error("Intentional exception for testing purposes")
}
}
+
+class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
+ extends ParquetOutputCommitter(outputPath, context) {
+
+ override def commitTask(context: TaskAttemptContext): Unit = {
+ sys.error("Intentional exception for testing purposes")
+ }
+}