From 76c155dd4483d58499e5cb66e5e9373bb771dbeb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 18 Aug 2015 00:59:05 +0800 Subject: [SPARK-7837] [SQL] Avoids double closing output writers when commitTask() fails When inserting data into a `HadoopFsRelation`, if `commitTask()` of the writer container fails, `abortTask()` will be invoked. However, both `commitTask()` and `abortTask()` try to close the output writer(s). The problem is that, closing underlying writers may not be an idempotent operation. E.g., `ParquetRecordWriter.close()` throws NPE when called twice. Author: Cheng Lian Closes #8236 from liancheng/spark-7837/double-closing. --- .../execution/datasources/WriterContainer.scala | 21 ++++++++-- .../datasources/parquet/ParquetIOSuite.scala | 46 +++++++++++++++++++++- 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index d36197e50d..e0147079e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -217,6 +217,8 @@ private[sql] class DefaultWriterContainer( val writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) writer.initConverter(dataSchema) + var writerClosed = false + // If anything below fails, we should abort the task. try { while (iterator.hasNext) { @@ -235,7 +237,10 @@ private[sql] class DefaultWriterContainer( def commitTask(): Unit = { try { assert(writer != null, "OutputWriter instance should have been initialized") - writer.close() + if (!writerClosed) { + writer.close() + writerClosed = true + } super.commitTask() } catch { case cause: Throwable => @@ -247,7 +252,10 @@ private[sql] class DefaultWriterContainer( def abortTask(): Unit = { try { - writer.close() + if (!writerClosed) { + writer.close() + writerClosed = true + } } finally { super.abortTask() } @@ -275,6 +283,8 @@ private[sql] class DynamicPartitionWriterContainer( val outputWriters = new java.util.HashMap[InternalRow, OutputWriter] executorSideSetup(taskContext) + var outputWritersCleared = false + // Returns the partition key given an input row val getPartitionKey = UnsafeProjection.create(partitionColumns, inputSchema) // Returns the data columns to be written given an input row @@ -379,8 +389,11 @@ private[sql] class DynamicPartitionWriterContainer( } def clearOutputWriters(): Unit = { - outputWriters.asScala.values.foreach(_.close()) - outputWriters.clear() + if (!outputWritersCleared) { + outputWriters.asScala.values.foreach(_.close()) + outputWriters.clear() + outputWritersCleared = true + } } def commitTask(): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d819f3ab5e..e6b0a2ea95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -424,7 +424,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { configuration.set( "spark.sql.parquet.output.committer.class", - classOf[BogusParquetOutputCommitter].getCanonicalName) + classOf[JobCommitFailureParquetOutputCommitter].getCanonicalName) try { val message = intercept[SparkException] { @@ -450,12 +450,54 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { }.toString assert(errorMessage.contains("UnknownHostException")) } + + test("SPARK-7837 Do not close output writer twice when commitTask() fails") { + val clonedConf = new Configuration(configuration) + + // Using a output committer that always fail when committing a task, so that both + // `commitTask()` and `abortTask()` are invoked. + configuration.set( + "spark.sql.parquet.output.committer.class", + classOf[TaskCommitFailureParquetOutputCommitter].getCanonicalName) + + try { + // Before fixing SPARK-7837, the following code results in an NPE because both + // `commitTask()` and `abortTask()` try to close output writers. + + withTempPath { dir => + val m1 = intercept[SparkException] { + sqlContext.range(1).coalesce(1).write.parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m1.contains("Intentional exception for testing purposes")) + } + + withTempPath { dir => + val m2 = intercept[SparkException] { + val df = sqlContext.range(1).select('id as 'a, 'id as 'b).coalesce(1) + df.write.partitionBy("a").parquet(dir.getCanonicalPath) + }.getCause.getMessage + assert(m2.contains("Intentional exception for testing purposes")) + } + } finally { + // Hadoop 1 doesn't have `Configuration.unset` + configuration.clear() + clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + } + } } -class BogusParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) +class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) extends ParquetOutputCommitter(outputPath, context) { override def commitJob(jobContext: JobContext): Unit = { sys.error("Intentional exception for testing purposes") } } + +class TaskCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) + extends ParquetOutputCommitter(outputPath, context) { + + override def commitTask(context: TaskAttemptContext): Unit = { + sys.error("Intentional exception for testing purposes") + } +} -- cgit v1.2.3